1
//--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // File: IOCompletionPortTaskScheduler.cs
7 //--------------------------------------------------------------------------
9 using System
.Collections
.Concurrent
;
10 using System
.Collections
.Generic
;
11 using System
.ComponentModel
;
12 using System
.Runtime
.InteropServices
;
13 using Microsoft
.Win32
.SafeHandles
;
15 namespace System
.Threading
.Tasks
.Schedulers
17 /// <summary>Provides a TaskScheduler that uses an I/O completion port for concurrency control.</summary>
18 public sealed class IOCompletionPortTaskScheduler
: TaskScheduler
, IDisposable
20 /// <summary>The queue of tasks to be scheduled.</summary>
21 private readonly ConcurrentQueue
<Task
> m_tasks
;
22 /// <summary>The I/O completion port to use for concurrency control.</summary>
23 private readonly IOCompletionPort m_iocp
;
24 /// <summary>Whether the current thread is a scheduler thread.</summary>
25 private ThreadLocal
<bool> m_schedulerThread
;
26 /// <summary>Event used to wait for all threads to shutdown.</summary>
27 private CountdownEvent m_remainingThreadsToShutdown
;
29 /// <summary>Initializes the IOCompletionPortTaskScheduler.</summary>
30 /// <param name="maxConcurrencyLevel">The maximum number of threads in the scheduler to be executing concurrently.</param>
31 /// <param name="numAvailableThreads">The number of threads to have available in the scheduler for executing tasks.</param>
32 public IOCompletionPortTaskScheduler(int maxConcurrencyLevel
, int numAvailableThreads
)
35 if (maxConcurrencyLevel
< 1) throw new ArgumentNullException("maxConcurrencyLevel");
36 if (numAvailableThreads
< 1) throw new ArgumentNullException("numAvailableThreads");
38 m_tasks
= new ConcurrentQueue
<Task
>();
39 m_iocp
= new IOCompletionPort(maxConcurrencyLevel
);
40 m_schedulerThread
= new ThreadLocal
<bool>();
41 m_remainingThreadsToShutdown
= new CountdownEvent(numAvailableThreads
);
43 // Create and start the threads
44 for (int i
= 0; i
< numAvailableThreads
; i
++)
50 // Note that this is a scheduler thread. Used for inlining checks.
51 m_schedulerThread
.Value
= true;
53 // Continually wait on the I/O completion port until
54 // there's a work item, then process it.
55 while (m_iocp
.WaitOne())
58 if (m_tasks
.TryDequeue(out next
)) TryExecuteTask(next
);
61 finally { m_remainingThreadsToShutdown.Signal(); }
62 }) { IsBackground = true }
.Start();
66 /// <summary>Dispose of the scheduler.</summary>
69 // Close the I/O completion port. This will cause any threads blocked
70 // waiting for items to wake up.
73 // Wait for all threads to shutdown. This could cause deadlock
74 // if the current thread is calling Dispose or is part of such a cycle.
75 m_remainingThreadsToShutdown
.Wait();
76 m_remainingThreadsToShutdown
.Dispose();
78 // Clean up remaining state
79 m_schedulerThread
.Dispose();
82 /// <summary>Gets a list of all tasks scheduled to this scheduler.</summary>
83 /// <returns>An enumerable of all scheduled tasks.</returns>
84 protected override IEnumerable
<Task
> GetScheduledTasks() { return m_tasks.ToArray(); }
86 /// <summary>Queues a task to this scheduler for execution.</summary>
87 /// <param name="task">The task to be executed.</param>
88 protected override void QueueTask(Task task
)
90 // Store the task and let the I/O completion port know that more work has arrived.
91 m_tasks
.Enqueue(task
);
95 /// <summary>Try to execute a task on the current thread.</summary>
96 /// <param name="task">The task to execute.</param>
97 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued to this scheduler.</param>
98 /// <returns>Whether the task was executed.</returns>
99 protected override bool TryExecuteTaskInline(Task task
, bool taskWasPreviouslyQueued
)
101 // Only inline from scheduler threads. This is to ensure concurrency control
102 // is able to handle inlining as well.
103 return m_schedulerThread
.Value
&& TryExecuteTask(task
);
106 /// <summary>Provides a simple managed wrapper for an I/O completion port.</summary>
107 private sealed class IOCompletionPort
: IDisposable
109 /// <summary>Infinite timeout value to use for GetQueuedCompletedStatus.</summary>
110 private UInt32 INFINITE_TIMEOUT
= unchecked((UInt32
)Timeout
.Infinite
);
111 /// <summary>An invalid file handle value.</summary>
112 private IntPtr INVALID_FILE_HANDLE
= unchecked((IntPtr
)(-1));
113 /// <summary>An invalid I/O completion port handle value.</summary>
114 private IntPtr INVALID_IOCP_HANDLE
= IntPtr
.Zero
;
116 /// <summary>The I/O completion porth handle.</summary>
117 private SafeFileHandle m_handle
;
119 /// <summary>Initializes the I/O completion port.</summary>
120 /// <param name="maxConcurrencyLevel">The maximum concurrency level allowed by the I/O completion port.</param>
121 public IOCompletionPort(Int32 maxConcurrencyLevel
)
123 // Validate the argument and create the port.
124 if (maxConcurrencyLevel
< 1) throw new ArgumentOutOfRangeException("maxConcurrencyLevel");
125 m_handle
= CreateIoCompletionPort(INVALID_FILE_HANDLE
, INVALID_IOCP_HANDLE
, UIntPtr
.Zero
, (UInt32
)maxConcurrencyLevel
);
128 /// <summary>Clean up.</summary>
129 public void Dispose() { m_handle.Dispose(); }
131 /// <summary>Notify that I/O completion port that new work is available.</summary>
132 public void NotifyOne()
134 if (!PostQueuedCompletionStatus(m_handle
, IntPtr
.Zero
, IntPtr
.Zero
, IntPtr
.Zero
))
135 throw new Win32Exception();
138 /// <summary>Waits for an item on the I/O completion port.</summary>
139 /// <returns>true if an item was available; false if the completion port closed before an item could be retrieved.</returns>
140 public bool WaitOne()
142 // Wait for an item to be posted.
143 // DangerousGetHandle is used so that the safe handle can be closed even while blocked in the call to GetQueuedCompletionStatus.
144 UInt32 lpNumberOfBytes
;
145 IntPtr lpCompletionKey
, lpOverlapped
;
146 if (!GetQueuedCompletionStatus(m_handle
.DangerousGetHandle(), out lpNumberOfBytes
, out lpCompletionKey
, out lpOverlapped
, INFINITE_TIMEOUT
))
148 int errorCode
= Marshal
.GetLastWin32Error();
149 if (errorCode
== 735 /*ERROR_ABANDONED_WAIT_0*/ || errorCode
== 6 /*INVALID_HANDLE*/)
152 throw new Win32Exception(errorCode
);
158 /// Creates an input/output (I/O) completion port and associates it with a specified file handle,
159 /// or creates an I/O completion port that is not yet associated with a file handle, allowing association at a later time.
161 /// <param name="fileHandle">An open file handle or INVALID_HANDLE_VALUE.</param>
162 /// <param name="existingCompletionPort">A handle to an existing I/O completion port or NULL.</param>
163 /// <param name="completionKey">The per-handle user-defined completion key that is included in every I/O completion packet for the specified file handle.</param>
164 /// <param name="numberOfConcurrentThreads">The maximum number of threads that the operating system can allow to concurrently process I/O completion packets for the I/O completion port.</param>
165 /// <returns>If the function succeeds, the return value is the handle to an I/O completion port. If the function fails, the return value is NULL.</returns>
166 [DllImport("kernel32.dll", SetLastError
= true)]
167 private static extern SafeFileHandle
CreateIoCompletionPort(
168 IntPtr fileHandle
, IntPtr existingCompletionPort
, UIntPtr completionKey
, UInt32 numberOfConcurrentThreads
);
170 /// <summary>Attempts to dequeue an I/O completion packet from the specified I/O completion port.</summary>
171 /// <param name="completionPort">A handle to the completion port.</param>
172 /// <param name="lpNumberOfBytes">A pointer to a variable that receives the number of bytes transferred during an I/O operation that has completed.</param>
173 /// <param name="lpCompletionKey">A pointer to a variable that receives the completion key value associated with the file handle whose I/O operation has completed.</param>
174 /// <param name="lpOverlapped">A pointer to a variable that receives the address of the OVERLAPPED structure that was specified when the completed I/O operation was started.</param>
175 /// <param name="dwMilliseconds">The number of milliseconds that the caller is willing to wait for a completion packet to appear at the completion port. </param>
176 /// <returns>Returns nonzero (TRUE) if successful or zero (FALSE) otherwise.</returns>
177 [DllImport("kernel32.dll", SetLastError
= true)]
178 private static extern Boolean
GetQueuedCompletionStatus(
179 IntPtr completionPort
, out UInt32 lpNumberOfBytes
, out IntPtr lpCompletionKey
, out IntPtr lpOverlapped
, UInt32 dwMilliseconds
);
181 /// <summary>Posts an I/O completion packet to an I/O completion port.</summary>
182 /// <param name="completionPort">A handle to the completion port.</param>
183 /// <param name="dwNumberOfBytesTransferred">The value to be returned through the lpNumberOfBytesTransferred parameter of the GetQueuedCompletionStatus function.</param>
184 /// <param name="dwCompletionKey">The value to be returned through the lpCompletionKey parameter of the GetQueuedCompletionStatus function.</param>
185 /// <param name="lpOverlapped">The value to be returned through the lpOverlapped parameter of the GetQueuedCompletionStatus function.</param>
186 /// <returns>If the function succeeds, the return value is nonzero. If the function fails, the return value is zero.</returns>
187 [DllImport("kernel32.dll", SetLastError
= true)]
188 private static extern Boolean
PostQueuedCompletionStatus(
189 SafeFileHandle completionPort
, IntPtr dwNumberOfBytesTransferred
, IntPtr dwCompletionKey
, IntPtr lpOverlapped
);